-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add implementation of Kafka 0.11 Records #973
Conversation
Kafka supports nullable arrays, and their null value is represented by legnth of -1.
Kafka 0.11 introduces a new Record format that replaces Message from the previous versions. The new format allows for Headers which are key-value pairs of application metadata associated with each message.
Kafka 0.11 introduced RecordBatch as a successor to MessageSet. Using the new RecordBatch is required for transactions and idempotent message delivery.
Many request/response structures can contain either RecordBatches or MessageSets depending on the version of Kafka the client is talking to. This changeset implements a sum type that makes it more convenient to work with these structures by abstracting away the type of the records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mostly pretty straightforward, just a couple of things.
My biggest concern is around the varint length calculations. I understand the problem you're facing with a dynamic reserve length for the encoder, but I find the solution itself a bit hard to follow. It also results in prep-encoding the majority of the record-set a couple more times than is strictly needed.
For the prep-encoder, the actual reserve length doesn't matter at all; there should be no need to recurse because counting up the lengths is just addition so the order doesn't matter.
For the real-encoder, the prep-encoder has already run so you should be able to know the appropriate reserve length at that point?
@@ -79,7 +79,7 @@ func (rd *realDecoder) getArrayLength() (int, error) { | |||
rd.off = len(rd.raw) | |||
return -1, ErrInsufficientData | |||
} | |||
tmp := int(binary.BigEndian.Uint32(rd.raw[rd.off:])) | |||
tmp := int(int32(binary.BigEndian.Uint32(rd.raw[rd.off:]))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why cast to int32 and then int?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we want to convert it to a signed value, so we're converting it to its signed counterpart (int usually 64 bits on 64bit platforms, so the sign conversion won't happen).
record.go
Outdated
controlMask = 0x20 | ||
) | ||
|
||
type Header struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we call this RecordHeader
to be a bit clearer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
return 0, fmt.Errorf("unknown records type: %v", r.recordsType) | ||
} | ||
|
||
func (r *Records) isPartial() (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method (and isControl below) don't have any coverage AFAICT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some coverage for them.
Indeed the uncompressed records will be prep-encoded twice. But I didn't find a clean solution (that doesn't involve checking in encode() whether the encoder is real or prep). |
I think that sounds like it might be simpler? If |
Actually looking again at the code that wouldn't work without a few more changes. |
Hmm, ya. What if |
Added dynamicPushEncoder interface that extends the pushEncoder with an adjustLength method that will be called by prepEncoder.pop() time so that it computes the actual length of the field. Also made varintLengthField implement this method so we can avoid a needless run of prepEncoder for uncompressed records.
So I added this additional interface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this approach better, thanks. Just some minor stuff now.
length_field.go
Outdated
} | ||
|
||
func (l *varintLengthField) run(curOffset int, buf []byte) error { | ||
if !l.adjusted { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is necessary, it will already fail in a pretty obvious way if this mistake is made.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can remove it, but I think the failure won't be too obvious (it will have length 0, which could happen in other scenarios as well).
length_field.go
Outdated
@@ -31,22 +31,43 @@ func (l *lengthField) check(curOffset int, buf []byte) error { | |||
type varintLengthField struct { | |||
startOffset int | |||
length int64 | |||
adjusted bool | |||
size int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having both size
and length
fields is going to cause confusion. Isn't size
always calculable? Should we just put that logic in reserveLength()
and call it as needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
size could always be calculated, but it involves encoding the varint into a buffer, so not very cheap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks pretty cheap to me? The buffer doesn't escape so it will be allocated on the stack, and the actual encoding itself is just a couple of bit-ops in a very short for-loop.
packet_encoder.go
Outdated
pushEncoder | ||
|
||
// Called during pop() to adjust the length of the field. | ||
adjustLength(currOffset int) int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd maybe call this updateLength
but it doesn't matter. The description should mention that the return value is the diff though, not the new length.
Actually does it need to return anything? The caller can always just call reserveLength
again, and subtract the old reserveLength
before-hand if it wants the diff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That will complicate the caller, since in practice most of the time they will call reserveLength
at push
time and then again at pop
time just to readjust.
The initial call to reserveLength
might not return 0 (especially if we go with your suggestion of computing the size on demand). So, most of the time at the adjust time you want the diff.
I will update the comment.
record.go
Outdated
Headers []*RecordHeader | ||
|
||
length varintLengthField | ||
totalLength int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is unused now.
record.go
Outdated
return 0, err | ||
} | ||
} | ||
return int(r.length.length) + r.length.size, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be simpler and more reliable to just always prep-encode and then ask the encoder. Otherwise this is going to bite people who e.g. add a header or something.
record_batch.go
Outdated
case CompressionNone: | ||
re = pe | ||
case CompressionGZIP, CompressionLZ4, CompressionSnappy: | ||
if err := b.computeRecordsLength(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block here is doing basically the same thing as the global encode(encoder) ([]byte, err)
method, i.e. using a prep-encoder to calculate the length and then making a byte array and using a real-encoder to encode it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, I can reuse that.
It seems that |
I can retrigger CI as needed, I've never been able to figure out how to let contributors do that without giving them full access to everything. |
packet_encoder.go
Outdated
@@ -50,3 +50,14 @@ type pushEncoder interface { | |||
// of data to the saved offset, based on the data between the saved offset and curOffset. | |||
run(curOffset int, buf []byte) error | |||
} | |||
|
|||
// dynamicPushEncoder extends the interface of pushEncoder for uses cases where the length of the | |||
// fields itself is unknown until its value was computed (for instance varint encoded lenght |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo s/lenght/length
record_batch.go
Outdated
return pe.pop() | ||
} | ||
|
||
var raw []byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you move this whole block (down to line ~126) into a method, and invert the check on b.compressedRecords
you can get rid of lines 77-85 which are duplicates of 127-135.
Maybe I should add a |
dynamicPushDecoder extends pushDecoder for cases when the field has variable length. Also, changed varintLengthField to make use of the new interface/
This is really nice, thanks! |
This PR introduces implementation of the new Record and RecordBatch formats from Kafka 0.11. Also it introduces an union type to deal with request/responses that can contain either records or messages.
Issue #901